/*
 * Copyright 2020 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.imagemgmt.version;

import azkaban.db.DatabaseOperator;
import azkaban.db.SQLTransaction;
import azkaban.imagemgmt.exception.ImageMgmtException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A JDBC based implementation for loading VersionSet from the backed db. This class keeps the copy
 * of entries in the version_set table locally as well to reduce the number of queries to the db.
 * <p>
 * This is a singleton class to be instantiated using dependency injection.
 */
@Singleton
public class JdbcVersionSetLoader implements VersionSetLoader {

  private static final Logger logger = LoggerFactory.getLogger(JdbcVersionSetLoader.class);

  private static final String SELECT_VSET_FROM_ID = "SELECT id, md5, json FROM "
      + "version_set WHERE id=?";
  private static final String SELECT_VSET_FROM_MD5 = "SELECT id, md5, json FROM "
      + "version_set WHERE md5=?";
  private static final String SELECT_ALL_VSET = "SELECT id, md5, json FROM "
      + "version_set";
  private static final String INSERT_VSET = "INSERT INTO version_set (md5, json) values (?,?)";
  private static final String DELETE_VSET = "DELETE FROM version_set WHERE md5=?";

  private final DatabaseOperator dbOperator;
  private final Map<String, VersionSet> md5ToVersionSet = new HashMap<>();
  private final Map<Integer, VersionSet> idToVersionSet = new HashMap<>();

  /**
   * This constructor should be instantiated using dependency injection only to make sure that that
   * class remains Singleton.
   * <p>
   * This class pre-loads all the versionSets from the db at the beginning and post that every
   * insert and deletion is both to db and the local maps
   *
   * @param dbOperator Interface to perform db related operations.
   */
  @Inject
  public JdbcVersionSetLoader(final DatabaseOperator dbOperator) throws ImageMgmtException {
    this.dbOperator = dbOperator;
    for (final VersionSet versionSet : fetchAllVersionSets()) {
      this.md5ToVersionSet.put(versionSet.getVersionSetMd5Hex(), versionSet);
      this.idToVersionSet.put(versionSet.getVersionSetId(), versionSet);
    }
  }

  /**
   * This method inserts the versionSetJsonString and versionSetMd5Hex to the Table version_set and
   * get the versionSetId autogenerated post insertion.
   */
  @Override
  public synchronized Optional<VersionSet> insertAndGetVersionSet(final String versionSetMd5Hex,
      final String versionSetJsonString) throws ImageMgmtException {
    final SQLTransaction<Integer> insertVersionSet = transOperator -> {
      final VersionSetHandler versionSetHandler = new VersionSetHandler();
      // Check if the versionSet already exists for versionSetMd5Hex
      final List<VersionSet> versionSets = transOperator
          .query(SELECT_VSET_FROM_MD5, versionSetHandler, versionSetMd5Hex);
      if (versionSets.size() > 1) {
        throw new SQLException(
            "Expected only one VersionSet for versionSetMd5Hex " + versionSetMd5Hex);
      }
      // Insert if it doesn't
      if (versionSets.isEmpty()) {
        transOperator.update(INSERT_VSET, versionSetMd5Hex, versionSetJsonString);
        return (int) transOperator.getLastInsertId();
      }
      return versionSets.get(0).getVersionSetId();
    };
    try {
      final Integer versionSetId = this.dbOperator.transaction(insertVersionSet);
      final VersionSet versionSet = new VersionSet(versionSetJsonString, versionSetMd5Hex,
          versionSetId);
      this.md5ToVersionSet.put(versionSetMd5Hex, versionSet);
      this.idToVersionSet.put(versionSetId, versionSet);
      return Optional.of(versionSet);
    } catch (final SQLException e) {
      logger.error("Exception occurred while inserting version set and getting version id", e);
      throw new ImageMgmtException("Unable to insert and get versionSetId", e);
    }
  }

  /**
   * Deletes the versionSet row from the table version_set corresponding to versionSetMd5Hex.
   */
  @Override
  public synchronized boolean deleteVersionSet(final String versionSetMd5Hex)
      throws ImageMgmtException {
    final SQLTransaction<Integer> insertVersionSet = transOperator -> transOperator
        .update(DELETE_VSET, versionSetMd5Hex);
    try {
      final Integer rowsDeleted = this.dbOperator.transaction(insertVersionSet);
      if (this.md5ToVersionSet.containsKey(versionSetMd5Hex)) {
        final VersionSet removedVersionSet = this.md5ToVersionSet.remove(versionSetMd5Hex);
        this.idToVersionSet.remove(removedVersionSet.getVersionSetId());
      }
      return rowsDeleted != 0;
    } catch (final SQLException e) {
      logger.error("Failed to execute: " + DELETE_VSET, e);
      throw new ImageMgmtException(
          "Exception occurred while removing version set for " + versionSetMd5Hex,
          e);
    }
  }

  /**
   * This method first checks if the versionSetId already exists in the local copy of the
   * versionSets, otherwise, it will insert the versionSet into the table and get the resulting
   * versionSetId.
   */
  @Override
  public synchronized Optional<VersionSet> getVersionSet(final String versionSetMd5Hex,
      final String versionSetJsonString)
      throws ImageMgmtException {
    if (this.md5ToVersionSet.containsKey(versionSetMd5Hex)) {
      final VersionSet versionSet = this.md5ToVersionSet.get(versionSetMd5Hex);
      return Optional.of(versionSet);
    }
    return insertAndGetVersionSet(versionSetMd5Hex, versionSetJsonString);
  }

  /**
   * This method first check if the versionSet exists in the local copy of versionSets, otherwise,
   * it gets the copy from the version_set table and updates the local copy.
   */
  @Override
  public synchronized Optional<VersionSet> getVersionSet(final String versionSetMd5Hex)
      throws ImageMgmtException {
    if (this.md5ToVersionSet.containsKey(versionSetMd5Hex)) {
      return Optional.of(this.md5ToVersionSet.get(versionSetMd5Hex));
    }
    final VersionSetHandler versionSetHandler = new VersionSetHandler();
    try {
      final List<VersionSet> versionSets = this.dbOperator
          .query(SELECT_VSET_FROM_MD5, versionSetHandler, versionSetMd5Hex);
      if (versionSets.size() > 1) {
        throw new ImageMgmtException(
            "Expected only one VersionSet for versionSetMd5Hex " + versionSetMd5Hex);
      }
      if (versionSets.isEmpty()) {
        logger.info("No VersionSet exists for versionSetMd5Hex " + versionSetMd5Hex);
        return Optional.empty();
      }
      final VersionSet versionSet = versionSets.get(0);
      this.idToVersionSet.put(versionSet.getVersionSetId(), versionSet);
      this.md5ToVersionSet.put(versionSetMd5Hex, versionSet);
      return Optional.of(versionSet);
    } catch (final SQLException e) {
      logger.error("Failed to execute: " + SELECT_VSET_FROM_MD5, e);
      throw new ImageMgmtException(
          "Failed to fetch VersionSet for versionSetMd5Hex " + versionSetMd5Hex,
          e);
    }
  }

  /**
   * This method first check if the versionSet exists in the local copy of versionSets, otherwise,
   * it gets the copy from the version_set table and updates the local copy.
   */
  @Override
  public synchronized Optional<VersionSet> getVersionSetById(final int versionSetId)
      throws ImageMgmtException {
    if (this.idToVersionSet.containsKey(versionSetId)) {
      return Optional.of(this.idToVersionSet.get(versionSetId));
    }
    final VersionSetHandler versionSetHandler = new VersionSetHandler();
    try {
      final List<VersionSet> versionSets = this.dbOperator
          .query(SELECT_VSET_FROM_ID, versionSetHandler, versionSetId);
      if (versionSets.size() > 1) {
        throw new ImageMgmtException(
            "Expected only one VersionSet for versionSetId " + versionSetId);
      }
      if (versionSets.isEmpty()) {
        logger.info("No VersionSet exists for versionSetId " + versionSetId);
        return Optional.empty();
      }
      final VersionSet versionSet = versionSets.get(0);
      this.idToVersionSet.put(versionSetId, versionSet);
      this.md5ToVersionSet.put(versionSet.getVersionSetMd5Hex(), versionSet);
      return Optional.of(versionSet);
    } catch (final SQLException e) {
      logger.error("Failed to execute: " + SELECT_VSET_FROM_ID, e);
      throw new ImageMgmtException("Failed to fetch VersionSet for versionSetId " + versionSetId,
          e);
    }
  }

  /**
   * Fetches all the rows from the version_set table and returns it as List of VersionSet.
   */
  @Override
  public synchronized List<VersionSet> fetchAllVersionSets() throws ImageMgmtException {
    final VersionSetHandler versionSetHandler = new VersionSetHandler();
    try {
      return this.dbOperator.query(SELECT_ALL_VSET, versionSetHandler);
    } catch (final SQLException e) {
      logger.error("Failed to execute: " + SELECT_ALL_VSET, e);
      throw new ImageMgmtException("Failed to fetch all VersionSets ", e);
    }
  }

  /**
   * A Handler class to modify the result set into List of VersionSet.
   */
  public static class VersionSetHandler implements ResultSetHandler<List<VersionSet>> {

    public static final int VERSIONSET_ID_IDX = 1;
    public static final int VERSIONSET_MD5_IDX = 2;
    public static final int VERSIONSET_JSON_IDX = 3;

    @Override
    public List<VersionSet> handle(final ResultSet rs) throws SQLException {
      final List<VersionSet> versionSets = new ArrayList<>();
      while (rs.next()) {
        final int versionSetId = rs.getInt(VERSIONSET_ID_IDX);
        final String versionSetMd5Hex = rs.getString(VERSIONSET_MD5_IDX);
        final String versionSetJsonString = rs.getString(VERSIONSET_JSON_IDX);
        try {
          versionSets.add(new VersionSet(versionSetJsonString, versionSetMd5Hex, versionSetId));
        } catch (final Exception e) {
          throw new SQLException(e);
        }
      }
      return versionSets;
    }
  }
}
